{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Scale Scikit-Learn for Small Data Problems\n",
    "==========================================\n",
    "\n",
    "This example demonstrates how Dask can scale scikit-learn to a cluster of machines for a CPU-bound problem.\n",
    "We'll fit a large model, a grid-search over many hyper-parameters, on a small dataset.\n",
    "\n",
    "This video talks demonstrates the same example on a larger cluster."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "keep_output": true
   },
   "outputs": [],
   "source": [
    "from IPython.display import IFrame\n",
    "\n",
    "IFrame(\"https://www.youtube.com/embed/5Zf6DQaf7jk\", 560, 315, frameborder=\"0\", allow=\"autoplay; encrypted-media\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from dask.distributed import Client, progress\n",
    "client = Client(n_workers=4, threads_per_worker=1, memory_limit='2GB')\n",
    "client"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "keep_output": true
   },
   "source": [
    "## Distributed Training\n",
    "\n",
    "<img src=\"../images/scikit-learn-logo-notext.png\"/> <img src=\"../images/joblib_logo.svg\" width=\"20%\"/> \n",
    "\n",
    "Scikit-learn uses [joblib](http://joblib.readthedocs.io/) for single-machine parallelism. This lets you train most estimators (anything that accepts an `n_jobs` parameter) using all the cores of your laptop or workstation.\n",
    "\n",
    "Alternatively, Scikit-Learn can use Dask for parallelism.  This lets you train those estimators using all the cores of your *cluster* without significantly changing your code.\n",
    "\n",
    "This is most useful for training large models on medium-sized datasets. You may have a large model when searching over many hyper-parameters, or when using an ensemble method with many individual estimators. For too small datasets, training times will typically be small enough that cluster-wide parallelism isn't helpful. For too large datasets (larger than a single machine's memory), the scikit-learn estimators may not be able to cope (though Dask-ML provides other ways for working with larger than memory datasets)."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Create Scikit-Learn Pipeline"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pprint import pprint\n",
    "from time import time\n",
    "import logging\n",
    "\n",
    "from sklearn.datasets import fetch_20newsgroups\n",
    "from sklearn.feature_extraction.text import HashingVectorizer\n",
    "from sklearn.feature_extraction.text import TfidfTransformer\n",
    "from sklearn.linear_model import SGDClassifier\n",
    "from sklearn.model_selection import GridSearchCV\n",
    "from sklearn.pipeline import Pipeline"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Scale Up: set categories=None to use all the categories\n",
    "categories = [\n",
    "    'alt.atheism',\n",
    "    'talk.religion.misc',\n",
    "]\n",
    "\n",
    "print(\"Loading 20 newsgroups dataset for categories:\")\n",
    "print(categories)\n",
    "\n",
    "data = fetch_20newsgroups(subset='train', categories=categories)\n",
    "print(\"%d documents\" % len(data.filenames))\n",
    "print(\"%d categories\" % len(data.target_names))\n",
    "print()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We'll define a small pipeline that combines text feature extraction with a simple classifier."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "pipeline = Pipeline([\n",
    "    ('vect', HashingVectorizer()),\n",
    "    ('tfidf', TfidfTransformer()),\n",
    "    ('clf', SGDClassifier(max_iter=1000)),\n",
    "])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Define Grid for Parameter Search"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Grid search over some parameters."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "parameters = {\n",
    "    'tfidf__use_idf': (True, False),\n",
    "    'tfidf__norm': ('l1', 'l2'),\n",
    "    'clf__alpha': (0.00001, 0.000001),\n",
    "    # 'clf__penalty': ('l2', 'elasticnet'),\n",
    "    # 'clf__n_iter': (10, 50, 80),\n",
    "}"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "grid_search = GridSearchCV(pipeline, parameters, n_jobs=-1, verbose=1, cv=3, refit=False, iid=False)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "To fit this normally, we would write\n",
    "\n",
    "\n",
    "```python\n",
    "grid_search.fit(data.data, data.target)\n",
    "```\n",
    "\n",
    "That would use the default joblib backend (multiple processes) for parallelism.\n",
    "To use the Dask distributed backend, which will use a cluster of machines to train the model, perform the fit in a `parallel_backend` context."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import joblib\n",
    "\n",
    "with joblib.parallel_backend('dask'):\n",
    "    grid_search.fit(data.data, data.target)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "If you had your distributed dashboard open during that fit, you'll notice that each worker performs some of the fit tasks."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Parallel, Distributed Prediction\n",
    "\n",
    "Sometimes, you're train on a small dataset, but need to predict for a much larger batch of data.\n",
    "In this case, you'd like your estimator to handle NumPy arrays and pandas DataFrames for training, and dask arrays or DataFrames for prediction. [`dask_ml.wrappers.ParallelPostFit`](http://ml.dask.org/modules/generated/dask_ml.wrappers.ParallelPostFit.html#dask_ml.wrappers.ParallelPostFit) provides exactly that. It's a meta-estimator. It does nothing during training; the underlying estimator (probably a scikit-learn estimator) will probably be in-memory on a single machine. But tasks like `predict`, `score`, etc. are parallelized and distributed.\n",
    "\n",
    "Most of the time, using `ParallelPostFit` is as simple as wrapping the original estimator.\n",
    "When used inside a GridSearch, you'll need to update the keys of the parameters, just like with any meta-estimator.\n",
    "The only complication comes when using `ParallelPostFit` with another meta-estimator like `GridSearchCV`. In this case, you'll need to prefix your parameter names with `estimator__`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from sklearn.datasets import load_digits\n",
    "from sklearn.svm import SVC\n",
    "from dask_ml.wrappers import ParallelPostFit"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We'll load the small NumPy arrays for training."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "X, y = load_digits(return_X_y=True)\n",
    "X.shape"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "svc = ParallelPostFit(SVC(random_state=0, gamma='scale'))\n",
    "\n",
    "param_grid = {\n",
    "    # use estimator__param instead of param\n",
    "    'estimator__C': [0.01, 1.0, 10],\n",
    "}\n",
    "\n",
    "grid_search = GridSearchCV(svc, param_grid, iid=False, cv=3)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "And fit as usual."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "grid_search.fit(X, y)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We'll simulate a large dask array by replicating the training data a few times.\n",
    "In reality, you would load this from your file system."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import dask.array as da"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "big_X = da.concatenate([\n",
    "    da.from_array(X, chunks=X.shape)\n",
    "    for _ in range(10)\n",
    "])\n",
    "big_X"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Operations like `predict`, or `predict_proba` return dask, rather than NumPy arrays.\n",
    "When you compute, the work will be done in parallel, out of core or distributed on the cluster."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "predicted = grid_search.predict(big_X)\n",
    "predicted"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "At this point predicted could be written to disk, or aggregated before returning to the client."
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
